package com.usfot;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;

/**
 * Created by liyanxin on 2015/3/17.
 */
public class Queue implements Watcher {

    private static final String addr = "127.0.0.1:2181";
    private String root;
    private ZooKeeper zk = null;
    private Integer mutex;

    /**
     * Constructor of producer-consumer queue
     *
     * @param root
     */
    public Queue(String root) {
        this.root = root;
        try {
            //连接zk服务器
            zk = new ZooKeeper(addr, 10 * 10000, this);
        } catch (IOException e) {
            e.printStackTrace();
        }
        mutex = new Integer(-1);
        // Create ZK node name
        if (zk != null) {
            try {
                //建立根目录节点
                Stat s = zk.exists(root, false);
                if (s == null) {
                    zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
                            CreateMode.PERSISTENT);
                }
            } catch (KeeperException e) {
                System.out
                        .println("Keeper exception when instantiating queue: "
                                + e.toString());
            } catch (InterruptedException e) {
                System.out.println("Interrupted exception");
            }
        }
    }

    /**
     * 当znode上事件触发，唤醒相应的等待线程
     *
     * @param event
     */
    public void process(WatchedEvent event) {
        synchronized (mutex) {
            //System.out.println("Process: " + event.getType());
            mutex.notify();
        }
    }

    /**
     * Add element to the queue.
     *
     * @param i
     * @return
     */
    boolean produce(int i) throws KeeperException, InterruptedException {
        ByteBuffer b = ByteBuffer.allocate(4);
        byte[] value;

        // Add child with value i
        b.putInt(i);
        value = b.array();
        zk.create(root + "/element" + i, value, ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.PERSISTENT);

        System.out.println("=========Produce znode=========");
        return true;
    }


    /**
     * Remove first element from the queue.
     *
     * @return
     * @throws KeeperException
     * @throws InterruptedException
     */
    int consume() throws KeeperException, InterruptedException {
        int retvalue = -1;
        Stat stat = null;

        // Get the first element available
        while (true) {
            synchronized (mutex) {
                List<String> list = zk.getChildren(root, true);
                if (list.size() == 0) {
                    System.out.println("Going to wait");
                    mutex.wait();
                } else {
                    Integer min = new Integer(list.get(0).substring(7));
                    for (String s : list) {
                        Integer tempValue = new Integer(s.substring(7));
                        //System.out.println("Temporary value: " + tempValue);
                        if (tempValue < min) min = tempValue;
                    }
                    System.out.println("Temporary VALUE: " + root + "/element" + min);
                    byte[] b = zk.getData(root + "/element" + min,
                            false, stat);
                    zk.delete(root + "/element" + min, 0);
                    ByteBuffer buffer = ByteBuffer.wrap(b);
                    retvalue = buffer.getInt();

                    return retvalue;
                }
            }
        }
    }
}
